#include "config.h"

#include <fcntl.h>
#include <sys/types.h>
#include <unistd.h>

#define DBG_SUBSYS S_LIBSTORAGE

#include "dbg.h"
#include "volume_proto.h"

#include "lsv_wbuffer.h"
#include "lsv_wbuffer_internal.h"

int _load_chunk_page(lsv_volume_proto_t *lsv_info, uint32_t chunk_id, uint32_t off, uint32_t size, lsv_s8_t *buf) {
        int ret;

        ret = lsv_volume_chunk_read_data(lsv_info, LSV_THIS_VOL_INO, chunk_id, off, size, buf);
        if (unlikely(ret)) {
                DERROR("upload volgc stroage err,errno:%d\n", ret);
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int _load_chunk(lsv_volume_proto_t *lsv_info, uint32_t chunk_id, void **_buf) {
        int ret;
        lsv_s8_t *buf;

        *_buf = NULL;

        ret = ymalloc((void **)&buf, LSV_CHUNK_SIZE);
        if (unlikely(ret))
                YASSERT(0);

        ret = lsv_volume_chunk_read(lsv_info, chunk_id, buf);
        if (unlikely(ret)) {
                DERROR("upload volgc stroage err,errno:%d\n", ret);
                GOTO(err_ret, ret);
        }

        *_buf = buf;
        return 0;
err_ret:
        return ret;
}

int _encode_checkpoint(lsv_volume_proto_t *lsv_info, uint32_t head_chunk, uint64_t last_lsn) {
        int ret;
        uint32_t buflen, buflen2;
        checkpoint_t *cp;

        buflen = sizeof(checkpoint_t);

        buflen2 = buflen;
        if (buflen % LSV_PAGE_SIZE != 0) {
                buflen2 += LSV_PAGE_SIZE - buflen % LSV_PAGE_SIZE;
        }
        YASSERT(buflen2 % LSV_PAGE_SIZE == 0);

        ret = ymalloc((void **)&cp, buflen2);
        if (unlikely(ret))
                YASSERT(0);

        memset(cp, 0, buflen2);

        cp->magic = WAL_HEAD_MAGIC;
        cp->version = WAL_HEAD_VERSION;
        cp->head_chunk = head_chunk;
        cp->last_lsn = last_lsn;

        {
                // dump
                uint32_t off = lsv_info->u.wbuf_page_id * LSV_PAGE_SIZE;
                lsv_volume_io_t vio;
                lsv_volume_io_init(&vio, LSV_PRIM_CHUNK_ID, off, buflen2, LSV_WBUFFER_STORAGE_TYPE);
                DINFO("dump chunk0 [off %u size %u]: head_chunk %u lsn %llu\n",
                      off,
                      buflen2,
                      head_chunk,
                      (LLU)last_lsn);
                ret = lsv_volume_chunk_update(lsv_info, &vio, (lsv_s8_t *)cp);
                if (unlikely(ret)){
                        DERROR("PERSISTENT WBUF'S CHUNK_ID INFO FAILED!!!\n");
                        GOTO(err_ret, ret);
                }

        }

        yfree((void **)&cp);
        return 0;
err_ret:
        yfree((void **)&cp);
        return ret;
}

int _decode_checkpoint(lsv_volume_proto_t *lsv_info, checkpoint_t *_cp) {
        int ret;
        lsv_s8_t buf[LSV_PAGE_SIZE];
        checkpoint_t *cp;

        memset(_cp, 0, sizeof(checkpoint_t));

        uint32_t off = lsv_info->u.wbuf_page_id * LSV_PAGE_SIZE;
        ret = _load_chunk_page(lsv_info, LSV_PRIM_CHUNK_ID, off, LSV_PAGE_SIZE, buf);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        cp = (checkpoint_t *)buf;
        if (cp->magic != WAL_HEAD_MAGIC || cp->version != WAL_HEAD_VERSION) {
                memset(buf, 0, LSV_PAGE_SIZE);
        }

        DINFO("load chunk0 [off %u size %u]: head_chunk %u lsn %llu\n",
              off,
              LSV_PAGE_SIZE,
              cp->head_chunk,
              (LLU)cp->last_lsn);

        *_cp = *cp;
        return 0;
err_ret:
        return ret;
}

int read_chunk_list(lsv_volume_proto_t *lsv_info, segment_head_t **_seg_head) {
        int ret;
        lsv_s8_t buf[LSV_PAGE_SIZE];
        segment_head_t *first_page, *seg_head;

        *_seg_head = NULL;

        uint32_t off = (lsv_info->u.wbuf_page_id + 1) * LSV_PAGE_SIZE;
        ret = _load_chunk_page(lsv_info, LSV_PRIM_CHUNK_ID, off, LSV_PAGE_SIZE, buf);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        first_page = (segment_head_t *)buf;
        // TODO valid segment head
        if (first_page->magic != WAL_HEAD_MAGIC || first_page->crc != 0) {
                ret = EINVAL;
                GOTO(err_ret, ret);
        }

        YASSERT(first_page->head_len >= 0 && first_page->head_len % LSV_PAGE_SIZE == 0);

        ret = ymalloc((void **)&seg_head, first_page->head_len);
        if (unlikely(ret))
                YASSERT(0);

        ret = _load_chunk_page(lsv_info, LSV_PRIM_CHUNK_ID, off, first_page->head_len, (lsv_s8_t *)seg_head);
        if (unlikely(ret))
                GOTO(err_free, ret);

        DINFO("load chunk0 [off %u size %u]: segment %u chunk %u\n",
              off,
              first_page->head_len,
              seg_head->segment_num,
              seg_head->chunk_num);

        *_seg_head = seg_head;
        return 0;
err_free:
        yfree((void **)&seg_head);
err_ret:
        return ret;
}

int lsv_wal2_segment_new(lsv_volume_proto_t *lsv_info, uint32_t count, uint32_t *new_chunk_ids,
                         lsv_wbuf_wal_segment_t **_segment);

static void _print_chunk(lsv_wbuf_wal_chunk_t *chunk, const char* name) {
        (void)name;
        DINFO("\tlsn %llu chunk %d %u next_page %u\n",
              (LLU)chunk->max_lsn,
              chunk->idx,
              chunk->chunk_id,
              chunk->next_page);
}

static void _print_segment(lsv_wbuf_wal_segment_t *segment, const char* name) {
        DINFO_NOP("[%s] lsn %llu segment %p chunk %u/%u\n",
              name,
              (LLU)segment->max_lsn,
              segment,
              segment->curr_chunk,
              segment->chunk_num);
        for (int i=0; i < segment->chunk_num; i++) {
                _print_chunk(segment->chunks[i], name);
        }
}

static void _print_list(lsv_wbuf_wal_t *wal) {
        struct list_head *pos, *n;
        lsv_wbuf_wal_segment_t *segment;

        int i = 0;
        list_for_each_safe(pos, n, &wal->wait_list.list) {
                segment = list_entry(pos, lsv_wbuf_wal_segment_t, hook);
                _print_segment(segment, "wait");
                i++;
        }

        i = 0;
        list_for_each_safe(pos, n, &wal->free_list.list) {
                segment = list_entry(pos, lsv_wbuf_wal_segment_t, hook);
                _print_segment(segment, "free");
                i++;
        }

        DINFO("wait %u free %u\n", wal->wait_list.count, wal->free_list.count);
}

// -- malloc chunk management

typedef struct {
        uint64_t max_lsn;
        uint32_t page_idx;
        void *data;           // 1M
} water_mark_t;

static uint32_t _total_chunk_num(const lsv_wbuf_wal_t *wal) {
        uint32_t wait_count = 0;
        uint32_t free_count = 0;

        struct list_head *pos, *n;
        lsv_wbuf_wal_segment_t *segment;

        list_for_each_safe(pos, n, &wal->wait_list.list) {
                segment = list_entry(pos, lsv_wbuf_wal_segment_t, hook);
                wait_count += segment->chunk_num;
        }

        list_for_each_safe(pos, n, &wal->free_list.list) {
                segment = list_entry(pos, lsv_wbuf_wal_segment_t, hook);
                free_count += segment->chunk_num;
        }

        return wait_count + free_count;
}

static int _encode_segment_data(lsv_volume_proto_t *lsv_info) {
        int ret;
        segment_head_t *seg_head;
        uint32_t chunk_num, buflen, buflen2;
        struct list_head *pos, *n;
        lsv_wbuf_wal_segment_t *segment;
        lsv_wbuf_t *wbuf = lsv_info->wbuf;
        lsv_wbuf_wal_t *wal = wbuf->wal;

        chunk_num = _total_chunk_num(wal);
        buflen = sizeof(segment_head_t) + sizeof(uint32_t) * chunk_num;

        // TODO 4K alignment
        buflen2 = buflen;
        if (buflen % LSV_PAGE_SIZE != 0) {
                buflen2 += LSV_PAGE_SIZE - buflen % LSV_PAGE_SIZE;
        }

        YASSERT(buflen2 % LSV_PAGE_SIZE == 0);

        ret = ymalloc((void **)&seg_head, buflen2);
        if (unlikely(ret))
                YASSERT(0);

        memset(seg_head, 0, buflen2);
        seg_head->head_len = buflen2;
        seg_head->crc = 0;
        seg_head->magic = WAL_HEAD_MAGIC;
        seg_head->version = WAL_HEAD_VERSION;
        seg_head->segment_num = wal->wait_list.count + wal->free_list.count;
        seg_head->segment_len = WAL_SEGMENT_CHUNK_NUM;
        seg_head->chunk_num = chunk_num;

        {
                // order: from wait -> free
                int chunk_idx = 0;
                list_for_each_safe(pos, n, &wal->wait_list.list) {
                        segment = list_entry(pos, lsv_wbuf_wal_segment_t, hook);
                        for (int i=0; i < segment->chunk_num; i++) {
                                seg_head->chunk_ids[chunk_idx++] = segment->chunks[i]->chunk_id;
                        }
                }

                list_for_each_safe(pos, n, &wal->free_list.list) {
                        segment = list_entry(pos, lsv_wbuf_wal_segment_t, hook);
                        for (int i=0; i < segment->chunk_num; i++) {
                                seg_head->chunk_ids[chunk_idx++] = segment->chunks[i]->chunk_id;
                        }
                }

                YASSERT(chunk_idx == chunk_num);

                // dump to master record
                uint32_t off = (lsv_info->u.wbuf_page_id + 1)* LSV_PAGE_SIZE;
                lsv_volume_io_t vio;
                lsv_volume_io_init(&vio, LSV_PRIM_CHUNK_ID, off, buflen2, LSV_WBUFFER_STORAGE_TYPE);
                DINFO("dump chunk0 [off %u size %u]: segment %u chunk %u\n",
                      off,
                      buflen2,
                      seg_head->segment_num,
                      chunk_num);
                ret = lsv_volume_chunk_update(lsv_info, &vio, (lsv_s8_t *)seg_head);
                if (unlikely(ret)){
                        DERROR("PERSISTENT WBUF'S CHUNK_ID INFO FAILED!!!\n");
                        GOTO(err_ret, ret);
                }
        }

        yfree((void **)&seg_head);
        return 0;
err_ret:
        yfree((void **)&seg_head);
        return ret;
}

static int replay_segment(lsv_volume_proto_t *lsv_info, int seg_idx, uint32_t count, uint32_t *chunk_ids,
                          uint64_t commit_lsn, int *stop) {
        int ret;
        lsv_wbuf_t *wbuf = lsv_info->wbuf;

        water_mark_t water_marks[WAL_SEGMENT_CHUNK_NUM];
        int next_chk = 0;
        uint64_t max_lsn = 0;
        int full_count = 0;
        int io_count = 0;

        wal_io_head_t *ioctx;
        wal_io_frag_t *frag;
        lsv_s8_t *new_buf;

        // TODO
        YASSERT(count == WAL_SEGMENT_CHUNK_NUM);

        // 假设1: 每个segment的开始水位线是0
        for (int i=0; i < count; i++) {
                // DINFO("replay chunk %d\n", chunk_ids[i]);
                water_marks[i].max_lsn = 0;
                water_marks[i].page_idx = 0;
                water_marks[i].data = NULL;
        }

        // stop是分水岭，标志找到了需要REDO的尾部；
        // 之前所有segment放入wait list，其后所有segment放入free list
        // 满足环形队列不变式
        while (*stop == 0) {
                if (full_count == WAL_SEGMENT_CHUNK_NUM) {
                        DINFO("full\n");
                        break;
                }

                if (water_marks[next_chk].page_idx == LSV_WBUF_PAGE_NUM) {
                        next_chk = (next_chk + 1) % WAL_SEGMENT_CHUNK_NUM;
                        continue;
                }

                if (water_marks[next_chk].data == NULL) {
                        ret = _load_chunk(lsv_info, chunk_ids[next_chk], &water_marks[next_chk].data);
                        if (unlikely(ret))
                                YASSERT(0);
                }

                DINFO("load seg %u chunk %u %u page %u\n",
                      seg_idx,
                      next_chk,
                      chunk_ids[next_chk],
                      water_marks[next_chk].page_idx);
                ioctx = (wal_io_head_t *)(water_marks[next_chk].data + water_marks[next_chk].page_idx * LSV_PAGE_SIZE);

                // TODO 若没满，而因为剩余空间不足IO所需，而跳过的segment, 怎么处理？
                // 加入wait list，还是free list?
                DINFO("seg %d chunk %u %u magic %u %u\n",
                      seg_idx,
                      next_chk,
                      chunk_ids[next_chk],
                      ioctx->magic,
                      WAL_HEAD_MAGIC);
                if (ioctx->magic != WAL_HEAD_MAGIC) {
                        DINFO("magic %u\n", ioctx->magic);
                        break;
                }

                // 检查LSN条件，LSN单调递增
                DINFO("seg %d chunk %u %u commit %llu < max_lsn %llu < lsn %llu\n",
                      seg_idx,
                      next_chk,
                      chunk_ids[next_chk],
                      (LLU)commit_lsn,
                      (LLU)max_lsn,
                      (LLU)ioctx->lsn);
                if (ioctx->lsn < commit_lsn) {
                        DINFO("stop 1, lsn %llu, commit_lsn %llu\n", (LLU)ioctx->lsn, (LLU)commit_lsn);
                        *stop = 1;
                        break;
                }

                // TODO max lsn
                if (max_lsn == 0) {
                        max_lsn = ioctx->lsn;
                }
                if (ioctx->lsn < max_lsn) {
                        DINFO("stop 2, lsn %llu, max_lsn %llu\n", (LLU)ioctx->lsn, (LLU)max_lsn);
                        *stop = 1;
                        break;
                }
                YASSERT(ioctx->lsn >= max_lsn);
                max_lsn = ioctx->lsn;

                // get io
                io_t io;
                io_init(&io, NULL, NULL, ioctx->lba, ioctx->size, 0);
                io.lsn = ioctx->lsn;

                buffer_t mbuf;
                mbuffer_init(&mbuf, 0);

                // 各个frag在数组内可能不连续
                for (int i=0; i < ioctx->frag_num; i++) {
                        frag = &ioctx->frags[i];

                        DINFO("lsn %llu seg %u load frag %d/%d chunk %u %u page %u %u off %llu %u\n",
                              (LLU)ioctx->lsn,
                              seg_idx,
                              i,
                              ioctx->frag_num,
                              frag->chunk_idx,
                              frag->chunk_id,
                              frag->page_idx,
                              frag->page_num,
                              (LLU)frag->off,
                              frag->size);

                        if (water_marks[frag->chunk_idx].data == NULL) {
                                ret = _load_chunk(lsv_info, frag->chunk_id, &water_marks[frag->chunk_idx].data);
                                if (unlikely(ret))
                                        YASSERT(0);
                        }

                        new_buf = water_marks[frag->chunk_idx].data + LSV_PAGE_SIZE * frag->page_idx;

                        // TODO > 512K
                        if (i == 0) {
                                // first
                                YASSERT(frag->chunk_idx == next_chk);
                                YASSERT(frag->chunk_id == chunk_ids[next_chk]);

                                int hl = wal_io_head_len();
                                mbuffer_appendmem(&mbuf, new_buf + hl, frag->size - hl);
                        } else {
                                // TODO use mbuffer_attach
                                mbuffer_appendmem(&mbuf, new_buf, frag->size);
                        }

                        if (water_marks[frag->chunk_idx].max_lsn < ioctx->lsn) {
                                water_marks[frag->chunk_idx].max_lsn = ioctx->lsn;
                        }
                        water_marks[frag->chunk_idx].page_idx += frag->page_num;
                        YASSERT(water_marks[frag->chunk_idx].page_idx <= LSV_WBUF_PAGE_NUM);
                        if (water_marks[frag->chunk_idx].page_idx == LSV_WBUF_PAGE_NUM) {
                                full_count++;
                        }

                        // last one
                        if (i == ioctx->frag_num - 1) {
                                // 支持正常写入，或螺旋写入
                                if (lsv_feature & LSV_FEATURE_SPIRAL_WRITE) {
                                        next_chk = (frag->chunk_idx + 1) % WAL_SEGMENT_CHUNK_NUM;
                                } else {
                                        next_chk = frag->chunk_idx;
                                }
                        }
                }

                // 执行REDO
                frag = &ioctx->frags[0];
                DINFO("REDO vol %llu lsn %llu seg %u chunk %u %u next_chk %u %u off %llu %u %u\n",
                      (LLU)lsv_info->ino,
                      (LLU)ioctx->lsn, seg_idx,
                      frag->chunk_idx, frag->chunk_id,
                      next_chk, chunk_ids[next_chk],
                      (LLU)io.offset, io.size, mbuf.len);

                // TODO
                YASSERT(mbuf.len == ioctx->size);
                volume_proto_mem_append_io(lsv_info, &io, &mbuf);

                mbuffer_free(&mbuf);

                // 更新全局LSN
                io_count++;
                if (wbuf->last_lsn < ioctx->lsn) {
                        wbuf->last_lsn = ioctx->lsn;
                }
        }

        // add segment
        {
                lsv_wbuf_wal_segment_t *segment;
                lsv_wal2_segment_new(lsv_info, count, chunk_ids, &segment);
                YASSERT(segment);

                DINFO("max_lsn %llu seg %u chunk %u %u io %u full_count %u\n",
                      (LLU)max_lsn,
                      seg_idx,
                      next_chk,
                      chunk_ids[next_chk],
                      io_count,
                      full_count);
                segment->max_lsn = max_lsn;
                segment->curr_chunk = next_chk;

                for (int i=0; i < WAL_SEGMENT_CHUNK_NUM; i++) {
                        if (water_marks[i].data != NULL) {
                                yfree(&water_marks[i].data);
                        }

                        if (water_marks[i].page_idx) {
                                DINFO("seg %u chunk %u %u page %u\n", seg_idx, i, chunk_ids[i], water_marks[i].page_idx);
                        }
                        // 设置各chunk水位线
                        segment->chunks[i]->max_lsn = water_marks[i].max_lsn;
                        segment->chunks[i]->next_page = water_marks[i].page_idx;
                }

                // TODO 都放入wait list，影响正确性吗？
                if (!*stop) {
                        DINFO("add segment %p to wait list\n", segment);
                        count_list_add_tail(&segment->hook, &wbuf->wal->wait_list);
                } else {
                        DINFO("add segment %p to free list\n", segment);
                        count_list_add_tail(&segment->hook, &wbuf->wal->free_list);
                }
        }

        DINFO("vol %llu wbuf lsn %llu wait %u free %u\n",
              (LLU)lsv_info->ino,
              (LLU)wbuf->last_lsn,
              wbuf->wal->wait_list.count,
              wbuf->wal->free_list.count);
        return 0;
}

static int _decode_segment_data(lsv_volume_proto_t *lsv_info, const checkpoint_t *cp) {
        int ret;
        lsv_wbuf_t *wbuf = lsv_info->wbuf;
        segment_head_t *seg_head;

        ret = read_chunk_list(lsv_info, &seg_head);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (seg_head == NULL) {
                GOTO(err_ret, ret);
        }

        // redo segment by segment
        int cursor = -1;
        for (int i=0; i < seg_head->segment_num; i++) {
                if (cp->head_chunk == 0 || cp->head_chunk == seg_head->chunk_ids[i*seg_head->segment_len]) {
                        cursor = i;
                        break;
                }
        }

        YASSERT(cursor >= 0);

        // TODO
        wbuf->last_lsn = cp->last_lsn;

        int idx, stop = 0;
        for (int i=0; i < seg_head->segment_num; i++) {
                idx = (cursor + i) % seg_head->segment_num;
                replay_segment(lsv_info, i, seg_head->segment_len, &seg_head->chunk_ids[idx * seg_head->segment_len],
                               cp->last_lsn,
                               &stop);
                //if (!continuing)
                //        break;
        }

        _print_list(wbuf->wal);

        yfree((void **)&seg_head);
        return 0;
err_ret:
        return ret;
}

int lsv_wal2_segment_init(lsv_volume_proto_t *lsv_info) {
        int ret;
        lsv_wbuf_t *wbuf = lsv_info->wbuf;
        lsv_wbuf_wal_t *wal = NULL;

        ret = ymalloc((void **)&wal, sizeof(lsv_wbuf_wal_t));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        count_list_init(&wal->free_list);
        count_list_init(&wal->wait_list);

        lsv_rwlock_init(&wal->malloc_rwlock);
        co_cond_init(&wal->full_cond);

        // wal->need_checkpoint = FALSE;

        wbuf->wal = wal;

        return 0;
err_ret:
        return ret;
}

int lsv_wal2_segment_destroy(lsv_volume_proto_t *lsv_info) {
        (void) lsv_info;
        return 0;
}

int lsv_wal2_segment_new(lsv_volume_proto_t *lsv_info, uint32_t count, uint32_t *new_chunk_ids,
                         lsv_wbuf_wal_segment_t **_segment) {
        int ret;
        uint32_t chunk_id;
        lsv_wbuf_wal_chunk_t *chunk;
        lsv_wbuf_wal_segment_t *segment;

        *_segment = NULL;

        (void) lsv_info;
        ret = ymalloc((void **)&segment, sizeof(lsv_wbuf_wal_segment_t));
        if(unlikely(ret))
                GOTO(err_ret, ret);

        //
        for (int i=0; i < count; ++i) {
                // new chunk
                ret = ymalloc((void **)&chunk, sizeof(lsv_wbuf_wal_chunk_t));
                if (unlikely(ret))
                        YASSERT(0);

                chunk_id = new_chunk_ids[i];
                chunk->idx = i;
                chunk->chunk_id = chunk_id;
                chunk->next_page = 0;
                chunk->max_lsn = 0;

                segment->chunks[i] = chunk;
                segment->chunk_num++;

                // DINFO("i %d chunk_id %u\n", i, chunk_id);
        }

        YASSERT(segment->chunk_num == WAL_SEGMENT_CHUNK_NUM);

        // new segment
        segment->curr_chunk = 0;
        segment->max_lsn = 0;

        *_segment = segment;

        return 0;
err_ret:
        return ret;
}

// int lsv_wal2_segment_init();
// int lsv_wal2_segment_destroy();

int lsv_wal2_segment_malloc(lsv_volume_proto_t *lsv_info, uint32_t count) {
        int ret;
        uint32_t *new_chunk_ids;
        lsv_wbuf_wal_segment_t *segment;

        YASSERT(count == WAL_SEGMENT_CHUNK_NUM);

        ret = ymalloc((void **)&new_chunk_ids, count * sizeof(uint32_t));
        if (unlikely(ret))
                YASSERT(0);

        ret = lsv_volume_chunk_malloc_batch(lsv_info, LSV_WBUFFER_STORAGE_TYPE, count, new_chunk_ids);
        if(unlikely(ret))
                GOTO(err_ret, ret);

        ret = lsv_wal2_segment_new(lsv_info, count, new_chunk_ids, &segment);
        if(unlikely(ret))
                GOTO(err_ret, ret);

        lsv_wbuf_t *wbuf = lsv_info->wbuf;
        count_list_add_tail(&segment->hook, &wbuf->wal->free_list);

        DINFO("wait %u free %u\n", wbuf->wal->wait_list.count, wbuf->wal->free_list.count);

        yfree((void **)&new_chunk_ids);
        return 0;
err_ret:
        yfree((void **)&new_chunk_ids);
        return ret;
}

int lsv_wal2_segment_free() {
        return 0;
}

static inline uint32_t lsv_wal2_segment_get_free(lsv_wbuf_wal_segment_t *segment) {
        uint32_t size = 0;
        lsv_wbuf_wal_chunk_t *chunk;

        for (int i=0 ; i < segment->chunk_num; i++) {
                chunk = segment->chunks[i];
                size += (LSV_WBUF_PAGE_NUM - chunk->next_page) * LSV_PAGE_SIZE;
                // DINFO("segment %p %u %u next_page %u size %u\n", segment, i, chunk->chunk_id, chunk->next_page, size);
        }
        YASSERT(size % LSV_PAGE_SIZE == 0);
        return size;
}

int lsv_wal2_segment_next_chunk(lsv_wbuf_wal_segment_t *segment, lsv_wbuf_wal_chunk_t **_wchunk) {
        lsv_wbuf_wal_chunk_t *chunk = NULL;

        *_wchunk = NULL;

        for (int i=0; i < segment->chunk_num; i++) {
                chunk = segment->chunks[segment->curr_chunk];
                if (!_chunk_is_full(chunk)) {
                        DINFO_NOP("chunk %u ok\n", segment->curr_chunk);
                        break;
                } else {
                        DINFO_NOP("chunk %u full\n", segment->curr_chunk);
                        segment->curr_chunk = (segment->curr_chunk + 1) % segment->chunk_num;
                }
        }

        YASSERT(chunk != NULL && !_chunk_is_full(chunk));

        *_wchunk = chunk;
        return 0;
}

int lsv_wal2_segment_ensure_free_list(lsv_volume_proto_t *lsv_info, int n) {
        int ret;
        lsv_wbuf_t *wbuf = lsv_info->wbuf;
        lsv_wbuf_wal_t *wal = wbuf->wal;

        (void) n;
        if (list_empty_careful(&wal->free_list.list)) {
                // 无法主动gc，没有lsn信息

                lsv_wrlock(&wal->malloc_rwlock);
                if (list_empty_careful(&wal->free_list.list)) {
                        // TODO yield，可能影响WAL LSN顺序.
                        // plock无法保证各个task的唤醒顺序
                        ret = lsv_wal2_segment_malloc(lsv_info, WAL_SEGMENT_CHUNK_NUM);
                        if (unlikely(ret))
                                YASSERT(0);

                        YASSERT(!list_empty_careful(&wal->free_list.list));

                        _encode_segment_data(lsv_info);

                        DINFO("wait %u free %u\n", wal->wait_list.count, wal->free_list.count);
                }
                lsv_unlock(&wal->malloc_rwlock);
        }

        return 0;
}

int lsv_wal2_segment_check(lsv_volume_proto_t *lsv_info, int size, lsv_wbuf_wal_segment_t **_segment) {
        lsv_wbuf_t *wbuf = lsv_info->wbuf;
        lsv_wbuf_wal_t *wal = wbuf->wal;
        struct list_head *pos;
        lsv_wbuf_wal_segment_t *segment;

        if (_segment) {
                *_segment = NULL;
        }

        YASSERT(size > 0);

        uint32_t size_with_padding = size;
        if (size % LSV_PAGE_SIZE != 0) {
                // 4K tail padding
                size_with_padding += LSV_PAGE_SIZE - size % LSV_PAGE_SIZE;
        }

        uint32_t free_size = 0;
        if (!list_empty_careful(&wal->free_list.list)) {
                pos = wal->free_list.list.next;
                segment = list_entry(pos, lsv_wbuf_wal_segment_t, hook);
                free_size = lsv_wal2_segment_get_free(segment);
                if (free_size >= size_with_padding) {
                        if (_segment) {
                                *_segment = segment;
                        }
                        return 1;
                }

                // space not enough

                // 标记segment full
                count_list_del_init(pos, &wal->free_list);
                count_list_add_tail(pos, &wal->wait_list);

                for (int i=0; i < segment->chunk_num; i++) {
                        DINFO("segment %p chunk %p %u %u page %u\n",
                              segment,
                              segment->chunks[i],
                              segment->chunks[i]->idx,
                              segment->chunks[i]->chunk_id,
                              segment->chunks[i]->next_page);
                }
        }

        DWARN("free size %u raw size %u need size %u\n", free_size, size, size_with_padding);
        return 0;
}

#if 0
int lsv_wal2_segment_check_with_malloc(lsv_volume_proto_t *lsv_info, int size, lsv_wbuf_wal_segment_t **segment) {
        while (1) {
                if (lsv_wal2_segment_check(lsv_info, size, segment)) {
                        break;
                }

                lsv_wal2_segment_ensure_free_list(lsv_info, 10);
        }

        YASSERT(*segment != NULL);
        return 0;
}
#endif

int lsv_wal2_segment_append_prepare(lsv_volume_proto_t *lsv_info, const io_t *io, wal_io_head_t *head) {
        int ret;
        lsv_wbuf_t *wbuf = lsv_info->wbuf;
        lsv_wbuf_wal_segment_t *segment;
        lsv_wbuf_wal_chunk_t *chunk;
        uint64_t off;
        int left;
        uint32_t step, free_size;
        wal_io_frag_t *frag;

        head->magic = WAL_HEAD_MAGIC;
        head->version = WAL_HEAD_VERSION;
        head->head_len = wal_io_head_len();
        head->crc = 0;

        off = io->offset;

        // add header, tail padding
        left = head->head_len + io->size;

        // check segment has enough free space,
        // if not, switch to next segment
        // 多任务进入时（并发），如果第一个任务发生切换，
        // 而顺序一致性， 在多个并发任务之间，不能保证进入序。

        // 提到更外层处理
        int wal_is_ok = lsv_wal2_segment_check(lsv_info, left, &segment);
        YASSERT(wal_is_ok == 1);

        // TODO 分配页

        // TODO 分配LSN和预分配位置，中间不能有yield，避免乱序
        // head->lsn = io->lsn;
        head->lsn = ++wbuf->last_lsn;
        head->lba = io->offset;
        head->size = io->size;
        head->frag_num = 0;

        segment->max_lsn = head->lsn;

        DINFO_NOP("lsn %llu segment %p off %llu left %u = (%u + %u)\n", (LLU)segment->max_lsn, segment,
              (LLU)off, left, head->head_len, io->size);
        while(left > 0) {
                // lock free, wait free, no yield, no page alignment
                ret = lsv_wal2_segment_next_chunk(segment, &chunk);
                if (unlikely(ret))
                        YASSERT(0);

                free_size = (LSV_WBUF_PAGE_NUM - chunk->next_page) * LSV_PAGE_SIZE;
                YASSERT(free_size >= LSV_PAGE_SIZE);

                step = _min(left, free_size);

                // new io frag
                frag = &head->frags[head->frag_num];

                frag->off = off;
                frag->size = step;
                // TODO first frag + header

                // where
                frag->chunk_idx = chunk->idx;
                frag->chunk_id = chunk->chunk_id;
                frag->page_idx = chunk->next_page;
                frag->page_num = _page_num(step);

                DINFO_NOP("left %u free_size %u step %u\n", left, free_size, step);
                DINFO_NOP("lsn %llu frag %u chunk %u %u page %u %u off %llu %u\n", (LLU)head->lsn, head->frag_num,
                      frag->chunk_idx, frag->chunk_id, frag->page_idx, frag->page_num,
                      (LLU)frag->off, frag->size);

                // when LSV_WBUF_PAGE_NUM, chunk is full
                chunk->max_lsn = head->lsn;
                chunk->next_page += _page_num(step);
                YASSERT(chunk->next_page <= LSV_WBUF_PAGE_NUM);

                head->frag_num++;

                // next IO
                left -= step;
                off += step;

                // 支持正常写入，或螺旋写入
                if (lsv_feature & LSV_FEATURE_SPIRAL_WRITE) {
                        // switch to next chunk
                        segment->curr_chunk = (segment->curr_chunk + 1) % segment->chunk_num;
                } else {
                        // segment->curr_chunk = (segment->curr_chunk + 1) % segment->chunk_num;
                }
        }

        YASSERT(left == 0);
        YASSERT(head->frag_num <= IO_FRAG_MAX);

        return 0;
}

int lsv_wal2_segment_gc(lsv_volume_proto_t *lsv_info, uint64_t lsn, uint64_t *commit_lsn) {
        int gc_count = 0;
        struct list_head *pos, *n;
        lsv_wbuf_t *wbuf = lsv_info->wbuf;
        lsv_wbuf_wal_t *wal = wbuf->wal;
        lsv_wbuf_wal_segment_t *segment;

        *commit_lsn = 0;

        DINFO("lsn %llu\n", (LLU)lsn);
        list_for_each_safe(pos, n, &wal->wait_list.list) {
                segment = list_entry(pos, lsv_wbuf_wal_segment_t, hook);
                // TODO 因为wbuf WAL是乱序的，LOG提交点不能保证该点之前的LSN都已经提交
                // 此种情况下，如果回收，会有潜在风险
                if (segment->max_lsn < lsn) {
                        // committed, can gc
                        // segment->is_commit = TRUE;

                        // NOTE: !!!reset segment and add to free list tail
                        for (int i=0; i < segment->chunk_num; i++) {
                                segment->chunks[i]->next_page = 0;
                                // segment->chunks[i]->in_use = 0;
                        }
                        segment->curr_chunk = 0;

                        count_list_del_init(pos, &wal->wait_list);
                        count_list_add_tail(pos, &wal->free_list);

                        gc_count++;
                        if (*commit_lsn != 0) {
                                YASSERT(*commit_lsn <= segment->max_lsn);
                        }
                        *commit_lsn = segment->max_lsn;

                        DINFO("segment %p lsn %llu < %llu %s\n",
                              segment,
                              (LLU)segment->max_lsn,
                              (LLU)lsn,
                              segment->max_lsn < lsn ? "COMMIT" : "UNCOMMIT");
                } else {
#if 1
                        break;
#else
                        DINFO("segment %p lsn %llu < %llu %s\n",
                              segment,
                              (LLU)segment->max_lsn,
                              (LLU)lsn,
                              segment->max_lsn < lsn ? "COMMIT" : "UNCOMMIT");
#endif
                }
        }

        return gc_count;
}

int lsv_wal2_segment_commit(lsv_volume_proto_t *lsv_info, uint64_t lsn) {
        lsv_wbuf_t *wbuf = lsv_info->wbuf;
        lsv_wbuf_wal_t *wal = wbuf->wal;
        lsv_wbuf_wal_segment_t *segment;
        uint64_t commit_lsn = 0;
        int gc_count = 0;

        gc_count = lsv_wal2_segment_gc(lsv_info, lsn, &commit_lsn);
        if (gc_count > 0) {
                YASSERT(commit_lsn > 0 && commit_lsn < lsn);

                struct list_head *pos;
                if (list_empty_careful(&wal->wait_list.list)) {
                        YASSERT(!list_empty_careful(&wal->free_list.list));
                        pos = wal->free_list.list.next;
                } else {
                        pos = wal->wait_list.list.next;
                }

                segment = list_entry(pos, lsv_wbuf_wal_segment_t, hook);

                YASSERT(segment->chunk_num > 0);

                /*
                 * TODO 搜索过程基于segment起始位置，故检查点也要对齐, 避免提交点在segment中间位置引起的问题.
                 * 第一个没有提交的segment的第一个chunk
                 */
                _encode_checkpoint(lsv_info, segment->chunks[0]->chunk_id, commit_lsn);

                // redo io which io_lsn > lsn
                _print_list(wal);
        }

        DINFO_NOP("free %u wait %u\n", wal->free_list.count, wal->wait_list.count);
        return 0;
}

int lsv_wal2_segment_load(lsv_volume_proto_t *lsv_info) {
        lsv_wbuf_t *wbuf = lsv_info->wbuf;
        lsv_wbuf_wal_t *wal = wbuf->wal;
        checkpoint_t cp;

        YASSERT(wal != NULL);

        _decode_checkpoint(lsv_info, &cp);
        _decode_segment_data(lsv_info, &cp);

        return 0;
}

typedef struct {
        struct list_head hook;
        task_t *task;
        int *count;
        int parallel;

        lsv_volume_proto_t *lsv_info;
        wal_io_head_t *ioctx;
        int idx;
        wal_io_frag_t *frag;
        void *buf;
        //
        int retval;
} append_frag_ctx_t;

static void __do_append(void *arg) {
        int ret;
        append_frag_ctx_t *ctx = arg;
        lsv_volume_proto_t *lsv_info = ctx->lsv_info;
        wal_io_frag_t *frag = ctx->frag;
        void *buf = ctx->buf;

        ANALYSIS_BEGIN(0);

        lsv_info->wbuf_write_count++;

        lsv_volume_io_t vio;
        lsv_volume_io_init(&vio, frag->chunk_id, frag->page_idx * LSV_PAGE_SIZE,
                           frag->page_num * LSV_PAGE_SIZE, LSV_WBUFFER_STORAGE_TYPE);
        ret = lsv_volume_chunk_update(lsv_info, &vio, buf);
        if (unlikely(ret)) {
                // TODO core
                DERROR("ret %d\n", ret);
        }

        DINFO_NOP("lsn %llu frag %u/%u chunk %u page %u %u off %llu %u\n",
              (LLU)ctx->ioctx->lsn,
              ctx->idx,
              ctx->ioctx->frag_num,
              frag->chunk_id,
              frag->page_idx,
              frag->page_num,
              (LLU)frag->off,
              frag->size);

        ctx->retval = ret;
        if (ctx->parallel) {
                YASSERT(*ctx->count > 0);
                *ctx->count -= 1;
                if (*ctx->count == 0) {
                        schedule_resume(ctx->task, 0, NULL);
                }
        }
        ANALYSIS_END(0, IO_WARN, "wbuf_append_frag");
}

/**
 * using parallel pattern: fork and join.
 *
 * @param lsv_info
 * @param buffer
 * @return
 * @sa __chunk_write_commit_raw@chunk_proto.c
 */
int lsv_wal2_segment_append(lsv_volume_proto_t *lsv_info, char *buffer) {
        int ret, off = 0, count = 0;
        wal_io_head_t *ioctx = (void*)buffer;
        wal_io_frag_t *frag;
        append_frag_ctx_t *ctx;
        struct list_head task_list;
        task_t task;
        int parallel;

        DINFO_NOP("lsn %llu off %llu size %u\n", (LLU)ioctx->lsn, (LLU)ioctx->lba, ioctx->size);

        parallel = ioctx->frag_num > 1 ? TRUE : FALSE;

        // fork
        if (parallel) {
                INIT_LIST_HEAD(&task_list);
                task = schedule_task_get();
        }

#if ENABLE_PROFILE
        struct timeval t1, t2;
        _gettimeofday(&t1, NULL);
#endif

        for (int i=0; i < ioctx->frag_num; i++) {
                frag = &ioctx->frags[i];
                // last
#if 0
                if (i == ioctx->frag_num - 1) {
                        if (frag->size % LSV_PAGE_SIZE) {
                                frag->size += LSV_PAGE_SIZE - frag->size % LSV_PAGE_SIZE;
                        }
                }
                YASSERT(frag->size % LSV_PAGE_SIZE == 0);
#endif

                if (parallel) {
                        ret = ymalloc((void **)&ctx, sizeof(append_frag_ctx_t));
                        if (unlikely(ret))
                                YASSERT(0);

                        count++;

                        ctx->parallel = TRUE;
                        ctx->lsv_info = lsv_info;
                        ctx->ioctx = ioctx;
                        ctx->idx = i;
                        ctx->frag = frag;
                        ctx->buf = (void *)buffer + off;
                        ctx->task = &task;
                        ctx->count = &count;

                        list_add_tail(&ctx->hook, &task_list);

                        schedule_task_new("wbuf_append_frag", __do_append, ctx, -1);
                } else {
                        append_frag_ctx_t local_ctx;
                        local_ctx.parallel = FALSE;
                        local_ctx.lsv_info = lsv_info;
                        local_ctx.ioctx = ioctx;
                        local_ctx.idx = i;
                        local_ctx.frag = frag;
                        local_ctx.buf = (void *)buffer + off;
                        // local_ctx.task = &task;
                        // local_ctx.count = &count;
                        __do_append(&local_ctx);
                }

                off += frag->size;
        }

        if (parallel) {
                // join
                ret = schedule_yield(__FUNCTION__, NULL, NULL);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                int success = 0;
                struct list_head *pos, *n;
                list_for_each_safe(pos, n, &task_list) {
                        ctx = list_entry(pos, append_frag_ctx_t, hook);
                        if (ctx->retval == 0)
                                success += 1;
                        list_del_init(pos);
                }

                // YASSERT(success == ioctx->frag_num);
                if (success < ioctx->frag_num) {
                        // TODO wal io error
                        ret = EIO;
                        GOTO(err_ret, ret);
                }
        }

#if ENABLE_PROFILE
        _gettimeofday(&t2, NULL);
        DINFO_NOP("vol %llu lsn %llu off %llu size %u parallel %d frag %u time %llu\n",
               (LLU)lsv_info->ino,
               (LLU)ioctx->lsn,
               (LLU)ioctx->lba,
               ioctx->size,
               parallel,
               ioctx->frag_num,
               (LLU)_time_used(&t1, &t2));
#endif

        return 0;
err_ret:
        return ret;
}
